{
  "metadata" : {
    "name" : "Flow Example",
    "user_save_timestamp" : "2134-02-01T01:00:00.000Z",
    "auto_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "language_info" : {
      "name" : "scala",
      "file_extension" : "scala",
      "codemirror_mode" : "text/x-scala"
    },
    "trusted" : true,
    "customLocalRepo" : null,
    "customRepos" : null,
    "customDeps" : null,
    "customImports" : null,
    "customArgs" : null,
    "customSparkConf" : null
  },
  "cells" : [ {
    "metadata" : {
      "id" : "31A56903BC86417696A237FDB13D9FA1"
    },
    "cell_type" : "markdown",
    "source" : "# Extensible and Reactive Flow Manager"
  }, {
    "metadata" : {
      "id" : "F2270767DD8844E89C428DBDD25421CB"
    },
    "cell_type" : "markdown",
    "source" : "This example shows the interactivity and reactivity features powered by the Spark Notebook.\n\nWe can draw default boxes and links, update them with scala code (their logic) and run the whole pipeline at the end to get the results back in a table.\n\nWe even can create personal boxes, and include them in the UI and then in a pipeline. See at the end (`SumPipeComp`).\n\nNow imagine all the possibilities:\n* porting the whole MLlib, pipeline, keystone\n* or H2O\n* or just regular scala\n* what about adding custom boxes for akka actors, using links to send messages?"
  }, {
    "metadata" : {
      "id" : "2C7B4B5B082D4ACC897B7125F08F304D"
    },
    "cell_type" : "markdown",
    "source" : "Import the components we might use (for information)."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "13F8BAED404748468FE64FE43F752E6F"
    },
    "cell_type" : "code",
    "source" : "import notebook.front.widgets.{BoxPipeComponent, PipeComponent, CustomizableBoxPipe}",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "48626136C8A247C89DEFDB17914B49C1"
    },
    "cell_type" : "markdown",
    "source" : "## Spark Example"
  }, {
    "metadata" : {
      "id" : "592E66DA5F1645589CD00A90662B362E"
    },
    "cell_type" : "markdown",
    "source" : "Just **double click** on the boxes to see the configuration, including the **scala** code to customize the box's logic. "
  }, {
    "metadata" : {
      "id" : "ADBA1045C7944A8B93E595E0A0AD6876"
    },
    "cell_type" : "markdown",
    "source" : "The following `Flow` instance will load some preconfiguration from the cell's metadata which combines those functions together.\n\n\n```scala\n(a:Map[String, Any])=>Map(\"even\" → (a(\"in\").asInstanceOf[RDD[Int]].filter(_ % 2 == 0)).map(x=>(x.toString, x))\n```\n\n```scala\n(a:Map[String, Any])=>Map(\"square\" → (a(\"in\").asInstanceOf[RDD[Int]].map(x => (x.toString, x*x)))\n```\n\n```scala\n(a:Map[String, Any])=> Map(\"join\" → (a(\"left\").asInstanceOf[RDD[(String, Int)]].join(a(\"right\").asInstanceOf[RDD[(String, Int)]]))\n```\n\n```scala\n(a:Map[String, Any])=> Map(\"collect\" → (a(\"in\").asInstanceOf[RDD[_]].collect)\n```"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "output_stream_collapsed" : true,
      "collapsed" : false,
      "id" : "3B6A35184D2048E78E8260D25400FB92",
      "extra" : {
        "state" : [ {
          "name" : "customizable",
          "id" : "58379e31-1d09-4e85-983c-315f07f4119d",
          "tpe" : "box",
          "parameters" : {
            "next" : "(a:Map[String, Any])=>Map(\"even\" → (a(\"in\").asInstanceOf[RDD[Int]].filter(_ % 2 == 0)).map(x=>(x.toString, x)))"
          },
          "inPorts" : [ "in" ],
          "outPorts" : [ "even" ],
          "position" : {
            "x" : 107,
            "y" : 139
          },
          "size" : {
            "width" : 100,
            "height" : 60
          },
          "remove" : false
        }, {
          "name" : "link",
          "id" : "024adad2-a395-4793-b488-3d1abda983a1",
          "tpe" : "link",
          "parameters" : {
            "source_id" : "58379e31-1d09-4e85-983c-315f07f4119d",
            "source_port" : "even",
            "target_id" : "bf167976-f3a7-447b-a250-d8447b5026af",
            "target_port" : "left"
          },
          "remove" : false
        }, {
          "name" : "customizable",
          "id" : "9c6d5011-3a88-4928-b264-a230d260bf75",
          "tpe" : "box",
          "parameters" : {
            "next" : "(a:Map[String, Any])=>Map(\"square\" → (a(\"in\").asInstanceOf[RDD[Int]].map(x => (x.toString, x*x))))"
          },
          "inPorts" : [ "in" ],
          "outPorts" : [ "square" ],
          "position" : {
            "x" : 63,
            "y" : 433
          },
          "size" : {
            "width" : 100,
            "height" : 60
          },
          "remove" : false
        }, {
          "name" : "link",
          "id" : "74bf5bd3-623a-4761-b8f9-78d881ba8132",
          "tpe" : "link",
          "parameters" : {
            "source_id" : "9c6d5011-3a88-4928-b264-a230d260bf75",
            "source_port" : "square",
            "target_id" : "bf167976-f3a7-447b-a250-d8447b5026af",
            "target_port" : "right"
          },
          "remove" : false
        }, {
          "name" : "customizable",
          "id" : "bf167976-f3a7-447b-a250-d8447b5026af",
          "tpe" : "box",
          "parameters" : {
            "next" : "(a:Map[String, Any])=> Map(\"join\" → (a(\"left\").asInstanceOf[RDD[(String, Int)]].join(a(\"right\").asInstanceOf[RDD[(String, Int)]])))"
          },
          "inPorts" : [ "left", "right" ],
          "outPorts" : [ "join" ],
          "position" : {
            "x" : 281,
            "y" : 259
          },
          "size" : {
            "width" : 100,
            "height" : 60
          },
          "remove" : false
        }, {
          "name" : "customizable",
          "id" : "7a0cfacd-4cbf-4d6d-977f-d8fb83afb428",
          "tpe" : "box",
          "parameters" : {
            "next" : "(a:Map[String, Any])=>  Map(\"collect\" -> a(\"in\").asInstanceOf[RDD[(String, (Int, Int))]].collect)"
          },
          "inPorts" : [ "in" ],
          "outPorts" : [ "collect" ],
          "position" : {
            "x" : 611,
            "y" : 267
          },
          "size" : {
            "width" : 100,
            "height" : 60
          },
          "remove" : false
        }, {
          "name" : "link",
          "id" : "5ffa488a-799f-49a6-90d7-35ed3809fabd",
          "tpe" : "link",
          "parameters" : {
            "source_id" : "bf167976-f3a7-447b-a250-d8447b5026af",
            "source_port" : "join",
            "target_id" : "7a0cfacd-4cbf-4d6d-977f-d8fb83afb428",
            "target_port" : "in"
          },
          "remove" : false
        } ]
      }
    },
    "cell_type" : "code",
    "source" : "val f:Flow = Flow()\nf",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "4124424C8C4C43388FDED44E55A87B26"
    },
    "cell_type" : "markdown",
    "source" : "The following is updating the current `Flow` to interprete the codes that were assigned to the `CustomizableBoxPipe` instances."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "presentation" : {
        "pivot_chart_state" : "{\n  \"hiddenAttributes\": [],\n  \"menuLimit\": 200,\n  \"cols\": [],\n  \"rows\": [],\n  \"vals\": [],\n  \"exclusions\": {},\n  \"inclusions\": {},\n  \"unusedAttrsVertical\": 85,\n  \"autoSortUnusedAttrs\": false,\n  \"inclusionsInfo\": {},\n  \"aggregatorName\": \"Count\",\n  \"rendererName\": \"Table\"\n}"
      },
      "id" : "DBE7A5107E7444FDA8AC20C9128FA6AE"
    },
    "cell_type" : "code",
    "source" : "f.update(\"f\",  $intp)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "E6ABCC06FAD74251868479B9F5EE053A"
    },
    "cell_type" : "markdown",
    "source" : "Then we can fill the sources with some values"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "60336DB434604C41A561BA66127F97EF"
    },
    "cell_type" : "code",
    "source" : "val result = f.run {\n  case (\"58379e31-1d09-4e85-983c-315f07f4119d\", _) => Map(\"in\" → sparkContext.parallelize(1 to 100))\n  case (\"9c6d5011-3a88-4928-b264-a230d260bf75\", _) => Map(\"in\" → sparkContext.parallelize(2 to 200 by 2))\n  case _ => Map.empty\n}",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "0BB5556F70BB4BF1874BD6CCCDC0136A"
    },
    "cell_type" : "markdown",
    "source" : "Finally, we can get the final result out of the last box."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "presentation" : {
        "pivot_chart_state" : "{\n  \"hiddenAttributes\": [],\n  \"menuLimit\": 200,\n  \"cols\": [],\n  \"rows\": [],\n  \"vals\": [],\n  \"exclusions\": {},\n  \"inclusions\": {},\n  \"unusedAttrsVertical\": 85,\n  \"autoSortUnusedAttrs\": false,\n  \"inclusionsInfo\": {},\n  \"aggregatorName\": \"Count\",\n  \"rendererName\": \"Table\"\n}",
        "tabs_state" : "{\n  \"tab_id\": \"#tab645362716-0\"\n}"
      },
      "id" : "9AE974F1BB434337AEA0967982BBADE9"
    },
    "cell_type" : "code",
    "source" : "val array:Array[(String, Int, Int)] = result((\"7a0cfacd-4cbf-4d6d-977f-d8fb83afb428\", \"collect\")).\n                                      asInstanceOf[Array[(String, (Int, Int))]].\n                                      map{ case (i, (j,k)) => (i,j,k) }",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "77C22C601FCD4463B9C5373AA51371DA"
    },
    "cell_type" : "markdown",
    "source" : "## Extending Capabilities: new box type"
  }, {
    "metadata" : {
      "id" : "C8A07C496E6B4A2D9FB42ADFD8838BC9"
    },
    "cell_type" : "markdown",
    "source" : "This is an example of new box, nothing useful but interesting to check in order to create new ones."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "48AE62905A054E5091B739C302CAC5C9"
    },
    "cell_type" : "code",
    "source" : "case class SumPipeComp(id:String = java.util.UUID.randomUUID.toString,\n                      mult:String=\"1\") \n  extends notebook.front.widgets.BoxPipeComponent[SumPipeComp]() {\n    \n  val name = \"sumPipe\"\n\n  val inPorts: List[String] = List(\"in\")\n  val outPorts: List[String] = List(\"out\")\n\n  val position:(Int, Int) = (100, 100)\n  val size:(Int, Int) = (100, 60)\n\n  val parameters = Map(\"mult\" → mult)\n  \n  def extractMult:Int = parameters(\"mult\").toInt\n\n  def next(a: Map[String,Any]): Map[String,Any] = {\n    a(\"in\") match {\n      case i:Int => Map(\"out\" → (extractMult * i))\n      case _ => ???\n    }\n  }\n\n  def merge(j:play.api.libs.json.JsValue):SumPipeComp = copy(\n    mult = (j \\ \"parameters\" \\ \"mult\").as[String]\n  )\n  \n  override val toString = toJSON.toString\n}",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "0B7438213CB64080BCDFA0E2E1B5B019"
    },
    "cell_type" : "markdown",
    "source" : "Then register this new box so it'll appear in the Flow UI (dropdown list so far)"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "F8EE7993A186467D8A6FA1792A0FF487"
    },
    "cell_type" : "code",
    "source" : "Flow.registerPipeComponent(\"sumPipe\", (() => SumPipeComp()))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "A78E3F3CD033415CA2A7594B80B755B8"
    },
    "cell_type" : "markdown",
    "source" : "Now use can use `sumPipe`"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "8453453E26154AF88F1504313C3872F3",
      "extra" : {
        "state" : [ {
          "name" : "sumPipe",
          "id" : "86ec777d-a970-4bef-af39-d3b613c5c7b9",
          "tpe" : "box",
          "parameters" : {
            "mult" : "1"
          },
          "inPorts" : [ "in" ],
          "outPorts" : [ "out" ],
          "position" : {
            "x" : 100,
            "y" : 100
          },
          "size" : {
            "width" : 100,
            "height" : 60
          },
          "remove" : false
        }, {
          "name" : "sumPipe",
          "id" : "8d25e574-38f4-43fa-bc53-7820f2725ef1",
          "tpe" : "box",
          "parameters" : {
            "mult" : "1"
          },
          "inPorts" : [ "in" ],
          "outPorts" : [ "out" ],
          "position" : {
            "x" : 100,
            "y" : 100
          },
          "size" : {
            "width" : 100,
            "height" : 60
          },
          "remove" : false
        } ]
      }
    },
    "cell_type" : "code",
    "source" : "val f2 = Flow()\nf2",
    "outputs" : [ ]
  } ],
  "nbformat" : 4
}